Skip to content

Observability: Telemetry and logging for collection and LLM job services#772

Merged
vprashrex merged 6 commits intomainfrom
feature/observability-fix
Apr 21, 2026
Merged

Observability: Telemetry and logging for collection and LLM job services#772
vprashrex merged 6 commits intomainfrom
feature/observability-fix

Conversation

@vprashrex
Copy link
Copy Markdown
Collaborator

@vprashrex vprashrex commented Apr 20, 2026

Summary

Target issue is #760

  • Kaapi lacks a full-fledged observability system, making debugging and monitoring difficult.
  • LLM calls run asynchronously via Celery, causing logs to be fragmented between backend and workers.
  • This makes it hard to trace a single request lifecycle and diagnose failures end-to-end.
  • Integration of Sentry with OpenTelemetry enables unified logging, distributed tracing, and complete visibility across backend and Celery.

Checklist

Before submitting a pull request, please ensure that you mark these task.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.
  • If you've fixed a bug or added code that is tested and has test cases.

- Integrated OpenTelemetry tracing into collection creation and deletion processes to improve observability.
- Added logging context for better traceability during job execution.
- Refactored job execution methods to include detailed span attributes and error handling.
- Updated callback mechanisms to ensure success and failure responses are properly logged and sent.
- Improved error handling in LLM job execution, including telemetry for provider calls and response handling.
- Updated the lock file to reflect changes in Python version requirements.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 20, 2026

Warning

Rate limit exceeded

@vprashrex has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 54 minutes and 17 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 54 minutes and 17 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 107ab144-2d73-4aa3-8f60-9d2a82e560a5

📥 Commits

Reviewing files that changed from the base of the PR and between 0f1b045 and 43e4feb.

📒 Files selected for processing (1)
  • backend/app/core/telemetry.py
📝 Walkthrough

Walkthrough

This PR introduces comprehensive OpenTelemetry instrumentation across the backend, adding tracing configuration, structured logging context, span attributes for API requests and Celery job execution, Sentry integration, and metrics collection for task and LLM activity. Configuration defaults enable OTEL by environment variable, and instrumentation is applied at multiple layers: middleware, API dependencies, service execution, and Celery task handling.

Changes

Cohort / File(s) Summary
Configuration & Environment
.env.example, backend/app/core/config.py, backend/pyproject.toml
Added OTEL_ENABLED and OTEL_SERVICE_NAME environment variables with defaults, and added OpenTelemetry (fastapi, celery, httpx, requests, logging instrumentation) and Sentry dependencies.
Telemetry & Observability Infrastructure
backend/app/core/telemetry.py, backend/app/core/logger.py, backend/app/core/sentry_filters.py
Created new telemetry module with setup_telemetry(), context logging via log_context(), metrics helpers (record_celery_task_*, record_llm_call_*), DB query/pool instrumentation, and telemetry flushing. Added configure_logging() with service name injection and logger suppression. Created Sentry transaction filter to remove low-signal spans.
Core Instrumentation
backend/app/main.py, backend/app/core/middleware.py, backend/app/core/db.py
Integrated telemetry/Sentry setup in main app initialization. Enhanced middleware to set OTel span attributes (method, route, status, duration) and Sentry tags. Instrumented SQLAlchemy engine at import time via instrument_db_engine().
API Authentication & Routing
backend/app/api/deps.py, backend/app/api/routes/collections.py, backend/app/api/routes/llm.py
Added span attribute setting for user/tenant context in get_auth_context. Wrapped collection and LLM route handlers with log_context() for structured telemetry; updated delete collection payload to include collection_id in job creation.
Celery Task Infrastructure
backend/app/celery/celery_app.py, backend/app/celery/utils.py, backend/app/celery/tasks/job_execution.py
Renamed worker initialization hook to initialize_worker_process and added Sentry/OTEL setup and telemetry flushing callbacks. Added _enqueue_with_trace_context() helper to inject OTel propagation headers into Celery messages. Updated all task handlers to extract and attach parent OTel context via _run_with_otel_parent().
Service-level Instrumentation
backend/app/services/llm/jobs.py, backend/app/services/collections/create_collection.py, backend/app/services/collections/delete_collection.py
Wrapped job execution with OTel spans and structured logging contexts. Added nested spans for provider calls, error recording on spans, and record_llm_call_started/finished metrics. Enhanced LLM execution with request/response attributes and new _execute_provider_call() helper.
Observability Utilities
backend/app/core/langfuse/langfuse.py
Renamed extract_output_value to extract_response_output and relaxed type annotations for input/metadata parameters from Dict to Any/dict.
Tests
backend/app/tests/services/collections/*
Updated existing tests to use pytest.raises for exception assertions and added new test cases for provider deletion failure and provider factory acquisition failure scenarios.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

ready-for-review

Suggested reviewers

  • Prajna1999
  • kartpop
  • AkhileshNegi

Poem

🐰 Traces now flutter through the code with grace,
Each span and context find their proper place,
Telemetry blooms in every request,
From API calls to jobs put to the test,
Observability hops along the way! 🌟

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 43.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title accurately describes the main change: enhancing telemetry and logging across collection and LLM job services with OpenTelemetry and Sentry integration.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/observability-fix

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
backend/app/services/collections/delete_collection.py (1)

29-35: ⚠️ Potential issue | 🟡 Minor

Align start_job’s return type with the returned value.

The function is annotated as str, but Line 65 returns the UUID object passed in as collection_job_id.

Suggested fix
 def start_job(
     db: Session,
     request: DeletionRequest,
     project_id: int,
     collection_job_id: UUID,
     organization_id: int,
-) -> str:
+) -> UUID:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

Also applies to: 65-65

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/collections/delete_collection.py` around lines 29 - 35,
The function start_job currently declares a return type of str but actually
returns the UUID object collection_job_id; update the signature to return UUID
(e.g., change "-> str" to "-> UUID") and ensure UUID is imported (from uuid
import UUID) so the return type matches the returned value; verify any callers
or tests expecting a str are adjusted if necessary.
backend/app/api/routes/collections.py (1)

88-92: ⚠️ Potential issue | 🟡 Minor

Add return annotations to the changed endpoints.

These handlers now have changed bodies but still omit return types; annotate them with the response wrapper type.

Suggested fix
 def create_collection(
     session: SessionDep,
     current_user: AuthContextDep,
     request: CreationRequest,
-):
+) -> APIResponse[CollectionJobImmediatePublic]:
@@
 def delete_collection(
     session: SessionDep,
     current_user: AuthContextDep,
     collection_id: UUID = FastPath(description="Collection to delete"),
     request: CallbackRequest | None = Body(default=None),
-):
+) -> APIResponse[CollectionJobImmediatePublic]:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

Also applies to: 156-160

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/routes/collections.py` around lines 88 - 92, The endpoint
handlers (notably create_collection and the other handler defined around lines
156-160) lack return type annotations; add explicit return type hints using the
project's response wrapper type (the same wrapper used elsewhere for API
handlers) so signatures become e.g. def create_collection(... ) ->
ResponseWrapper[CreationResponse]: and likewise annotate the second handler with
its appropriate wrapper/response type; update imports if needed to reference the
response wrapper and the concrete response DTO types referenced in the function
bodies.
backend/app/services/collections/create_collection.py (1)

39-46: ⚠️ Potential issue | 🟡 Minor

Align start_job’s return type with the returned value.

The function is annotated as str, but Line 76 returns collection_job_id as a UUID.

Suggested fix
 def start_job(
     db: Session,
     request: CreationRequest,
     project_id: int,
     collection_job_id: UUID,
     with_assistant: bool,
     organization_id: int,
-) -> str:
+) -> UUID:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

Also applies to: 76-76

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/collections/create_collection.py` around lines 39 - 46,
The function start_job is annotated to return str but actually returns
collection_job_id (a UUID); update the return type annotation from str to UUID
and ensure UUID is imported/used from uuid (or typing) so the signature reads
start_job(... ) -> UUID; verify all parameters remain fully typed and adjust any
callers or type checks expecting str if necessary.
backend/app/api/routes/llm.py (1)

53-55: ⚠️ Potential issue | 🟡 Minor

Add the missing return type for llm_call.

The changed endpoint should declare its APIResponse[LLMJobImmediatePublic] return type.

Suggested fix
 def llm_call(
     _current_user: AuthContextDep, session: SessionDep, request: LLMCallRequest
-):
+) -> APIResponse[LLMJobImmediatePublic]:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/routes/llm.py` around lines 53 - 55, The llm_call endpoint is
missing an explicit return type; update the function signature for llm_call to
declare it returns APIResponse[LLMJobImmediatePublic] (i.e., def
llm_call(_current_user: AuthContextDep, session: SessionDep, request:
LLMCallRequest) -> APIResponse[LLMJobImmediatePublic]:). Ensure the referenced
types (APIResponse and LLMJobImmediatePublic) are imported if not already.
🧹 Nitpick comments (5)
backend/app/core/config.py (1)

138-139: Default mismatch with .env.example.

OTEL_ENABLED defaults to False here but .env.example sets OTEL_ENABLED=true. Consider aligning (either default True here or false in the example) to avoid confusion about the out-of-the-box behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/core/config.py` around lines 138 - 139, The default for
OTEL_ENABLED in config.py (OTEL_ENABLED: bool = False) conflicts with
.env.example which sets OTEL_ENABLED=true; update one to match: either change
OTEL_ENABLED in config.py to True (OTEL_ENABLED: bool = True) to reflect the
example, or change the .env.example entry to false—make the chosen source of
truth consistent and keep OTEL_SERVICE_NAME unchanged.
backend/app/celery/tasks/job_execution.py (1)

19-56: Type the new OTel helper boundary.

The new helpers leave task_instance, fn, and the return value untyped. Add a generic callable type so task return types are preserved.

Suggested fix
 import logging
+from collections.abc import Callable
+from typing import Any, TypeVar
 
 from asgi_correlation_id import correlation_id
@@
 logger = logging.getLogger(__name__)
+_T = TypeVar("_T")
@@
-def _extract_parent_context(task_instance) -> otel_context.Context:
+def _extract_parent_context(task_instance: Any) -> otel_context.Context:
@@
-def _run_with_otel_parent(task_instance, fn):
+def _run_with_otel_parent(task_instance: Any, fn: Callable[[], _T]) -> _T:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/celery/tasks/job_execution.py` around lines 19 - 56, The helper
functions _extract_parent_context and _run_with_otel_parent lack proper type
hints for task_instance, fn, and the return value; add typing to preserve task
return types by importing TypeVar and Callable, declare a TypeVar R for the
return type, type task_instance as the appropriate Celery Task/Any (or Task
request type) and type fn as Callable[[], R], and annotate _run_with_otel_parent
to return R; also update _extract_parent_context to return otel_context.Context
(already present) and ensure any use-sites match the new generic signature.
backend/app/services/llm/jobs.py (1)

129-173: Inconsistency with start_job: no span created here.

start_job (Line 81-86) wraps the job creation + task scheduling in tracer.start_as_current_span("llm.start_job") and records exceptions / attributes on the span. start_chain_job only opens log_context(...) and skips the tracer span entirely, which means chain-start failures don't surface as a span in Sentry AI Insights / OTel traces the same way LLM-call starts do. If this is intentional, ignore; otherwise mirror the pattern so the two entry points have parity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 129 - 173, start_chain_job
currently only uses log_context and omits creating an OpenTelemetry/Sentry span
like start_job does; wrap the critical section in
tracer.start_as_current_span("llm.start_chain_job") (matching the pattern used
in start_job) around job creation + start_llm_chain_job, add the same
span.set_attribute calls and exception recording (including setting span.status
on error) so chain-start failures appear in traces, and ensure to record the
task_id and job_id on the span; locate start_chain_job and mirror the span usage
and exception handling logic from start_job to implement this.
backend/app/core/telemetry.py (1)

8-8: Minor: address ruff hints (UP035, B010).

  • Line 8: Iterator should be imported from collections.abc per UP035.
  • Line 558: ruff B010 — setattr(engine, "_kaapi_db_telemetry_instrumented", True) is equivalent to plain attribute assignment.
♻️ Proposed fix
-from typing import TYPE_CHECKING, Any, Iterator
+from collections.abc import Iterator
+from typing import TYPE_CHECKING, Any
...
-    setattr(engine, "_kaapi_db_telemetry_instrumented", True)
+    engine._kaapi_db_telemetry_instrumented = True  # type: ignore[attr-defined]

Also applies to: 558-558

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/core/telemetry.py` at line 8, Update the imports and attribute
assignment to address ruff hints: replace Iterator import from typing with
importing Iterator from collections.abc (leave TYPE_CHECKING and Any as-is) so
change the import line that currently lists "TYPE_CHECKING, Any, Iterator"; and
replace the setattr call that sets "_kaapi_db_telemetry_instrumented" on the
engine (the line using setattr(engine, "_kaapi_db_telemetry_instrumented",
True)) with a direct attribute assignment
engine._kaapi_db_telemetry_instrumented = True to satisfy B010; keep the same
attribute name and effect.
backend/app/celery/utils.py (1)

16-23: Add type hint for task and surface apply_async failures.

Two small concerns in the new helper:

  1. The task parameter has no type annotation. As per coding guidelines: "Always add type hints to all function parameters and return values in Python code".
  2. apply_async(...) can raise broker errors (e.g., connection refused, OperationalError). Currently every start_*_job(...) wrapper catches these via try/except (see start_job in jobs.py, Line 101-120), which is fine — just worth noting that _enqueue_with_trace_context is not idempotent and a retry-after-partial-failure path would re-inject OTel headers. Not actionable here, just a note for callers.
♻️ Proposed type-hint fix
-def _enqueue_with_trace_context(task, **kwargs) -> str:
+def _enqueue_with_trace_context(task: "Task", **kwargs: Any) -> str:
     """Publish Celery task with explicit trace context headers."""
     otel_headers: dict[str, str] = {}
     inject(otel_headers)
     celery_headers = dict(otel_headers)
     celery_headers["otel"] = otel_headers
     async_result = task.apply_async(kwargs=kwargs, headers=celery_headers)
     return async_result.id

And at the top of the file:

+from typing import Any, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from celery import Task

As per coding guidelines: "Always add type hints to all function parameters and return values in Python code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/celery/utils.py` around lines 16 - 23, Add an explicit type hint
for the task parameter (use celery.app.task.Task) and keep the return type as
str; then wrap the call to task.apply_async(...) in a try/except to surface
broker/apply_async failures with context: catch Exception as e and re-raise a
new RuntimeError (or custom exception) that includes the task identity (e.g.,
task.name) and chain the original exception via "from e" so callers can handle
or log the underlying broker error; update any imports if needed (e.g., from
celery.app.task import Task).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/api/deps.py`:
- Around line 52-59: Remove PII from OpenTelemetry span attributes by stopping
the direct setting of auth_context.user.email and, unless approved,
auth_context.organization.name and auth_context.project.name on spans; keep only
non-PII identifiers (span.set_attribute("user.id", ...),
span.set_attribute("tenant.org_id", ...),
span.set_attribute("tenant.project_id", ...)) or, if you need correlation,
replace the direct name/email values with a deterministic hash or masked value
before calling span.set_attribute for keys "user.email", "tenant.org_name", and
"tenant.project_name".

In `@backend/app/api/routes/llm.py`:
- Around line 162-170: The code warns when llm_call.usage is missing but still
constructs Usage(**llm_call.usage) which will raise; update the handler that
builds LLMCallResponse (the block creating LLMCallResponse with
response=llm_response, usage=Usage(**llm_call.usage), provider_raw_response=...)
to first guard llm_call.usage and either (A) return a controlled 4xx/5xx error
indicating missing usage (including job_id and project_id in the error/log) or
(B) populate a safe default Usage instance (e.g., zeros/nulls) before passing it
into LLMCallResponse; ensure the chosen approach is used consistently where
get_llm_call / LLMCallResponse and Usage are referenced so the endpoint no
longer raises when llm_call.usage is None.

In `@backend/app/core/logger.py`:
- Around line 43-68: The logging guard currently prevents reconfiguring when
configure_logging is called again, causing an old service name to persist;
update configure_logging to be service-name aware by storing the configured
service name on the root logger (e.g., add a
root_logger._kaapi_logging_service_name) and, if _kaapi_logging_configured is
True but the provided service_name (resolved_service_name) differs from
root_logger._kaapi_logging_service_name, replace or update the ServiceNameFilter
instances on existing handlers (or recreate handlers) so they use the new
resolved_service_name; ensure to update the stored _kaapi_logging_service_name
when changes are applied and keep existing use of CorrelationIdFilter and
_kaapi_logging_configured.

In `@backend/app/core/telemetry.py`:
- Around line 351-363: The loop currently checks only for key presence but may
call span.set_attribute with None; update the logic in the block that iterates
over ("gen_ai.request.temperature", "temperature") etc. to only call
span.set_attribute(attr_key, params.get(param_key)) when params.get(param_key)
is not None (i.e., guard against None values), and similarly change the tools
handling so span.set_attribute("gen_ai.request.available_tools",
json.dumps(tools)) is only executed when tools is not None and non-empty; ensure
these checks are done around the existing span.set_attribute usage so the
OpenTelemetry attributes are never set to None.

In `@backend/app/services/collections/create_collection.py`:
- Around line 279-306: The except block in create_collection.execute_job
currently logs the error, attempts provider cleanup, marks the job failed via
_mark_job_failed, and sends the failure callback (build_failure_payload +
send_callback) but then returns normally; to ensure Celery records the task as
failed, re-raise the original exception at the end of that except block (use a
bare "raise" to preserve the original traceback) immediately after sending the
callback and after _mark_job_failed completes, so the exception propagated by
span.record_exception and logger.error is not swallowed.

In `@backend/app/services/collections/delete_collection.py`:
- Around line 236-245: The exception is currently swallowed after calling
span.record_exception/ span.set_status and _mark_job_failed_and_callback, so
Celery treats the task as successful; modify the except block in the function
containing span.record_exception to re-raise the caught exception after the
failure callback: keep the existing calls to span.record_exception(err),
span.set_status(...), and _mark_job_failed_and_callback(project_id=project_id,
collection_id=collection_id, job_id=job_uuid, err=err,
callback_url=deletion_request.callback_url) and then add a plain "raise" to
re-raise the original exception so upstream task failure handling (e.g., Celery
task_failure telemetry) runs.

In `@backend/app/services/llm/jobs.py`:
- Around line 599-601: The per-LLM-call synchronous
flush_telemetry(timeout_millis=10000) in execute_llm_call is causing additive
tail latency for chain jobs; remove this unconditional 10s flush and instead
either (a) delete the call so you rely on the existing outer finally blocks and
task_postrun flush, or (b) replace the literal with a config-driven short
timeout (e.g., read TELEMETRY_PER_LLM_FLUSH_MS and default to 0/500) so the
flush is off or uses a tiny timeout by default; update execute_llm_call to
reference that config variable and ensure behavior remains backward-compatible.
- Around line 891-911: The nested except currently re-uses the outer exception
variable e, hiding the real DB/update error; modify the inner except in
execute_chain_job to catch a new variable (e.g., update_err) instead of reusing
e, and change the logger.error there to include that new variable and its
exc_info so the update_llm_chain_status failure is logged (reference
update_llm_chain_status, ChainStatus.FAILED, logger.error, and the
Session(engine) context).
- Around line 720-751: The log message in execute_job uses inconsistent
formatting for callback_url and always logs "Error if any..." at INFO level;
update the logger call that currently formats f"...callback_url
{callback_url_str}" to use an equals sign like callback_url={callback_url_str}
to match other fields, and change the post-execution logging that references
result.error so it only emits a log when result.error is truthy (e.g., if
result.error: logger.error(...)) or else demote it to debug (logger.debug(...))
— adjust the calls around the execute_llm_call invocation and the subsequent
logger usage where result and logger are referenced.

---

Outside diff comments:
In `@backend/app/api/routes/collections.py`:
- Around line 88-92: The endpoint handlers (notably create_collection and the
other handler defined around lines 156-160) lack return type annotations; add
explicit return type hints using the project's response wrapper type (the same
wrapper used elsewhere for API handlers) so signatures become e.g. def
create_collection(... ) -> ResponseWrapper[CreationResponse]: and likewise
annotate the second handler with its appropriate wrapper/response type; update
imports if needed to reference the response wrapper and the concrete response
DTO types referenced in the function bodies.

In `@backend/app/api/routes/llm.py`:
- Around line 53-55: The llm_call endpoint is missing an explicit return type;
update the function signature for llm_call to declare it returns
APIResponse[LLMJobImmediatePublic] (i.e., def llm_call(_current_user:
AuthContextDep, session: SessionDep, request: LLMCallRequest) ->
APIResponse[LLMJobImmediatePublic]:). Ensure the referenced types (APIResponse
and LLMJobImmediatePublic) are imported if not already.

In `@backend/app/services/collections/create_collection.py`:
- Around line 39-46: The function start_job is annotated to return str but
actually returns collection_job_id (a UUID); update the return type annotation
from str to UUID and ensure UUID is imported/used from uuid (or typing) so the
signature reads start_job(... ) -> UUID; verify all parameters remain fully
typed and adjust any callers or type checks expecting str if necessary.

In `@backend/app/services/collections/delete_collection.py`:
- Around line 29-35: The function start_job currently declares a return type of
str but actually returns the UUID object collection_job_id; update the signature
to return UUID (e.g., change "-> str" to "-> UUID") and ensure UUID is imported
(from uuid import UUID) so the return type matches the returned value; verify
any callers or tests expecting a str are adjusted if necessary.

---

Nitpick comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 19-56: The helper functions _extract_parent_context and
_run_with_otel_parent lack proper type hints for task_instance, fn, and the
return value; add typing to preserve task return types by importing TypeVar and
Callable, declare a TypeVar R for the return type, type task_instance as the
appropriate Celery Task/Any (or Task request type) and type fn as Callable[[],
R], and annotate _run_with_otel_parent to return R; also update
_extract_parent_context to return otel_context.Context (already present) and
ensure any use-sites match the new generic signature.

In `@backend/app/celery/utils.py`:
- Around line 16-23: Add an explicit type hint for the task parameter (use
celery.app.task.Task) and keep the return type as str; then wrap the call to
task.apply_async(...) in a try/except to surface broker/apply_async failures
with context: catch Exception as e and re-raise a new RuntimeError (or custom
exception) that includes the task identity (e.g., task.name) and chain the
original exception via "from e" so callers can handle or log the underlying
broker error; update any imports if needed (e.g., from celery.app.task import
Task).

In `@backend/app/core/config.py`:
- Around line 138-139: The default for OTEL_ENABLED in config.py (OTEL_ENABLED:
bool = False) conflicts with .env.example which sets OTEL_ENABLED=true; update
one to match: either change OTEL_ENABLED in config.py to True (OTEL_ENABLED:
bool = True) to reflect the example, or change the .env.example entry to
false—make the chosen source of truth consistent and keep OTEL_SERVICE_NAME
unchanged.

In `@backend/app/core/telemetry.py`:
- Line 8: Update the imports and attribute assignment to address ruff hints:
replace Iterator import from typing with importing Iterator from collections.abc
(leave TYPE_CHECKING and Any as-is) so change the import line that currently
lists "TYPE_CHECKING, Any, Iterator"; and replace the setattr call that sets
"_kaapi_db_telemetry_instrumented" on the engine (the line using setattr(engine,
"_kaapi_db_telemetry_instrumented", True)) with a direct attribute assignment
engine._kaapi_db_telemetry_instrumented = True to satisfy B010; keep the same
attribute name and effect.

In `@backend/app/services/llm/jobs.py`:
- Around line 129-173: start_chain_job currently only uses log_context and omits
creating an OpenTelemetry/Sentry span like start_job does; wrap the critical
section in tracer.start_as_current_span("llm.start_chain_job") (matching the
pattern used in start_job) around job creation + start_llm_chain_job, add the
same span.set_attribute calls and exception recording (including setting
span.status on error) so chain-start failures appear in traces, and ensure to
record the task_id and job_id on the span; locate start_chain_job and mirror the
span usage and exception handling logic from start_job to implement this.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c64e3285-4258-4801-9752-d5a56c557a8f

📥 Commits

Reviewing files that changed from the base of the PR and between d512861 and de1b099.

⛔ Files ignored due to path filters (1)
  • backend/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • .env.example
  • .gitignore
  • backend/app/api/deps.py
  • backend/app/api/routes/collections.py
  • backend/app/api/routes/llm.py
  • backend/app/celery/celery_app.py
  • backend/app/celery/tasks/job_execution.py
  • backend/app/celery/utils.py
  • backend/app/core/config.py
  • backend/app/core/db.py
  • backend/app/core/langfuse/langfuse.py
  • backend/app/core/logger.py
  • backend/app/core/middleware.py
  • backend/app/core/sentry_filters.py
  • backend/app/core/telemetry.py
  • backend/app/main.py
  • backend/app/services/collections/create_collection.py
  • backend/app/services/collections/delete_collection.py
  • backend/app/services/llm/jobs.py

Comment thread backend/app/api/deps.py Outdated
Comment thread backend/app/api/routes/llm.py Outdated
Comment on lines +43 to +68
def configure_logging(service_name: str | None = None) -> None:
root_logger = logging.getLogger()
if getattr(root_logger, "_kaapi_logging_configured", False):
return

# Stream handler (console)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.addFilter(CorrelationIdFilter())
logger.addHandler(stream_handler)
root_logger.setLevel(LOGGING_LEVEL)

# Rotating file handler
file_handler = RotatingFileHandler(
LOG_FILE_PATH, maxBytes=10 * 1024 * 1024, backupCount=5
)
file_handler.setFormatter(formatter)
file_handler.addFilter(CorrelationIdFilter())
logger.addHandler(file_handler)
formatter = logging.Formatter(LOGGING_FORMAT)
resolved_service_name = service_name or settings.OTEL_SERVICE_NAME

stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.addFilter(CorrelationIdFilter())
stream_handler.addFilter(ServiceNameFilter(resolved_service_name))

file_handler = RotatingFileHandler(
LOG_FILE_PATH, maxBytes=10 * 1024 * 1024, backupCount=5
)
file_handler.setFormatter(formatter)
file_handler.addFilter(CorrelationIdFilter())
file_handler.addFilter(ServiceNameFilter(resolved_service_name))

root_logger.handlers.clear()
root_logger.addHandler(stream_handler)
root_logger.addHandler(file_handler)
root_logger._kaapi_logging_configured = True
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the logging guard service-name aware.

configure_logging(service_name="kaapi-celery") is ignored once _kaapi_logging_configured is set, so a process that configured logging earlier keeps the wrong service_name in every log record.

Suggested fix
 def configure_logging(service_name: str | None = None) -> None:
     root_logger = logging.getLogger()
-    if getattr(root_logger, "_kaapi_logging_configured", False):
+    resolved_service_name = service_name or settings.OTEL_SERVICE_NAME
+    if (
+        getattr(root_logger, "_kaapi_logging_configured", False)
+        and getattr(root_logger, "_kaapi_logging_service_name", None)
+        == resolved_service_name
+    ):
         return
 
     root_logger.setLevel(LOGGING_LEVEL)
 
     formatter = logging.Formatter(LOGGING_FORMAT)
-    resolved_service_name = service_name or settings.OTEL_SERVICE_NAME
 
     stream_handler = logging.StreamHandler()
@@
     root_logger.addHandler(stream_handler)
     root_logger.addHandler(file_handler)
     root_logger._kaapi_logging_configured = True
+    root_logger._kaapi_logging_service_name = resolved_service_name
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/core/logger.py` around lines 43 - 68, The logging guard currently
prevents reconfiguring when configure_logging is called again, causing an old
service name to persist; update configure_logging to be service-name aware by
storing the configured service name on the root logger (e.g., add a
root_logger._kaapi_logging_service_name) and, if _kaapi_logging_configured is
True but the provided service_name (resolved_service_name) differs from
root_logger._kaapi_logging_service_name, replace or update the ServiceNameFilter
instances on existing handlers (or recreate handlers) so they use the new
resolved_service_name; ensure to update the stored _kaapi_logging_service_name
when changes are applied and keep existing use of CorrelationIdFilter and
_kaapi_logging_configured.

Comment on lines +351 to +363
for attr_key, param_key in (
("gen_ai.request.temperature", "temperature"),
("gen_ai.request.max_tokens", "max_tokens"),
("gen_ai.request.top_p", "top_p"),
("gen_ai.request.presence_penalty", "presence_penalty"),
("gen_ai.request.frequency_penalty", "frequency_penalty"),
):
if param_key in params:
span.set_attribute(attr_key, params.get(param_key))

tools = params.get("tools")
if tools is not None:
span.set_attribute("gen_ai.request.available_tools", json.dumps(tools))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

Does OpenTelemetry Python span.set_attribute raise or silently drop when value is None?

💡 Result:

OpenTelemetry Python span.set_attribute neither raises an exception nor silently drops the attribute when the value is None. Instead, the behavior is undefined, as explicitly stated in the official documentation and API specification. The documentation strongly discourages passing None values, noting: "Note: The behavior of None value attributes is undefined, and hence strongly discouraged." This applies to both set_attribute(key, value) and set_attributes(attributes). The OpenTelemetry specification confirms that null attribute values are invalid, with attempting to set null being undefined behavior. In practice, SDK implementations (like NonRecordingSpan) treat it as a no-op without raising, but users should avoid None values to ensure portability and correctness across implementations and exporters.

Citations:


🏁 Script executed:

# Check the code context around lines 351-363
sed -n '340,375p' backend/app/core/telemetry.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 1640


🏁 Script executed:

# Search for how params are constructed and if None values are passed
rg -n "temperature\s*=" backend/app/core/telemetry.py | head -20

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 57


🏁 Script executed:

# Look for params construction and usage context
rg -B 10 "gen_ai.request.temperature" backend/app/core/telemetry.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 588


Check for None values in params before setting OpenTelemetry span attributes.

param_key in params only guards existence, not value. If a caller stores temperature=None (common in config defaults), set_attribute("gen_ai.request.temperature", None) passes undefined behavior per OpenTelemetry specification—behavior is not guaranteed across implementations and exporters. This violates the spec requirement that attribute values be valid.

🛡️ Proposed fix
     for attr_key, param_key in (
         ("gen_ai.request.temperature", "temperature"),
         ("gen_ai.request.max_tokens", "max_tokens"),
         ("gen_ai.request.top_p", "top_p"),
         ("gen_ai.request.presence_penalty", "presence_penalty"),
         ("gen_ai.request.frequency_penalty", "frequency_penalty"),
     ):
-        if param_key in params:
-            span.set_attribute(attr_key, params.get(param_key))
+        value = params.get(param_key)
+        if value is not None:
+            span.set_attribute(attr_key, value)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/core/telemetry.py` around lines 351 - 363, The loop currently
checks only for key presence but may call span.set_attribute with None; update
the logic in the block that iterates over ("gen_ai.request.temperature",
"temperature") etc. to only call span.set_attribute(attr_key,
params.get(param_key)) when params.get(param_key) is not None (i.e., guard
against None values), and similarly change the tools handling so
span.set_attribute("gen_ai.request.available_tools", json.dumps(tools)) is only
executed when tools is not None and non-empty; ensure these checks are done
around the existing span.set_attribute usage so the OpenTelemetry attributes are
never set to None.

Comment thread backend/app/services/collections/create_collection.py
Comment thread backend/app/services/collections/delete_collection.py
Comment thread backend/app/services/llm/jobs.py Outdated
Comment on lines +720 to 751
logger.info(
f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url {callback_url_str}"
)

try:
with Session(engine) as session:
job_crud = JobCrud(session=session)
job_crud.update(
job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING)
)
try:
with Session(engine) as session:
job_crud = JobCrud(session=session)
job_crud.update(
job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING)
)

langfuse_credentials = get_provider_credential(
session=session,
org_id=organization_id,
project_id=project_id,
provider="langfuse",
)

langfuse_credentials = get_provider_credential(
session=session,
org_id=organization_id,
result = execute_llm_call(
config=request.config,
query=request.query,
job_id=job_uuid,
project_id=project_id,
provider="langfuse",
organization_id=organization_id,
request_metadata=request.request_metadata,
langfuse_credentials=langfuse_credentials,
include_provider_raw_response=request.include_provider_raw_response,
)

result = execute_llm_call(
config=request.config,
query=request.query,
job_id=job_uuid,
project_id=project_id,
organization_id=organization_id,
request_metadata=request.request_metadata,
langfuse_credentials=langfuse_credentials,
include_provider_raw_response=request.include_provider_raw_response,
)

logger.info(
f"[execute_job] Error if any during execution of job: {result.error}"
)

if result.success:
callback_response = APIResponse.success_response(
data=result.response, metadata=result.metadata
logger.info(
f"[execute_job] Error if any during execution of job: {result.error}"
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Two log-quality nits in execute_job.

  • Line 721: callback_url {callback_url_str} is missing the = separator used elsewhere (job_id={...}, task_id={...}). Reads oddly in log aggregators.
  • Line 749-751: This log fires on both success and failure paths and prints Error if any during execution of job: None on every successful call, at INFO level. Consider logging it only when result.error is truthy, or demoting to DEBUG.
♻️ Proposed fix
         logger.info(
-            f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url {callback_url_str}"
+            f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url={callback_url_str}"
         )
...
-            logger.info(
-                f"[execute_job] Error if any during execution of job: {result.error}"
-            )
+            if result.error:
+                logger.warning(
+                    f"[execute_job] LLM call returned error | job_id={job_id}, error={result.error}"
+                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.info(
f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url {callback_url_str}"
)
try:
with Session(engine) as session:
job_crud = JobCrud(session=session)
job_crud.update(
job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING)
)
try:
with Session(engine) as session:
job_crud = JobCrud(session=session)
job_crud.update(
job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING)
)
langfuse_credentials = get_provider_credential(
session=session,
org_id=organization_id,
project_id=project_id,
provider="langfuse",
)
langfuse_credentials = get_provider_credential(
session=session,
org_id=organization_id,
result = execute_llm_call(
config=request.config,
query=request.query,
job_id=job_uuid,
project_id=project_id,
provider="langfuse",
organization_id=organization_id,
request_metadata=request.request_metadata,
langfuse_credentials=langfuse_credentials,
include_provider_raw_response=request.include_provider_raw_response,
)
result = execute_llm_call(
config=request.config,
query=request.query,
job_id=job_uuid,
project_id=project_id,
organization_id=organization_id,
request_metadata=request.request_metadata,
langfuse_credentials=langfuse_credentials,
include_provider_raw_response=request.include_provider_raw_response,
)
logger.info(
f"[execute_job] Error if any during execution of job: {result.error}"
)
if result.success:
callback_response = APIResponse.success_response(
data=result.response, metadata=result.metadata
logger.info(
f"[execute_job] Error if any during execution of job: {result.error}"
)
logger.info(
f"[execute_job] Starting LLM job execution | job_id={job_id}, task_id={task_id}, callback_url={callback_url_str}"
)
try:
with Session(engine) as session:
job_crud = JobCrud(session=session)
job_crud.update(
job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING)
)
langfuse_credentials = get_provider_credential(
session=session,
org_id=organization_id,
project_id=project_id,
provider="langfuse",
)
result = execute_llm_call(
config=request.config,
query=request.query,
job_id=job_uuid,
project_id=project_id,
organization_id=organization_id,
request_metadata=request.request_metadata,
langfuse_credentials=langfuse_credentials,
include_provider_raw_response=request.include_provider_raw_response,
)
if result.error:
logger.warning(
f"[execute_job] LLM call returned error | job_id={job_id}, error={result.error}"
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 720 - 751, The log message in
execute_job uses inconsistent formatting for callback_url and always logs "Error
if any..." at INFO level; update the logger call that currently formats
f"...callback_url {callback_url_str}" to use an equals sign like
callback_url={callback_url_str} to match other fields, and change the
post-execution logging that references result.error so it only emits a log when
result.error is truthy (e.g., if result.error: logger.error(...)) or else demote
it to debug (logger.debug(...)) — adjust the calls around the execute_llm_call
invocation and the subsequent logger usage where result and logger are
referenced.

Comment thread backend/app/services/llm/jobs.py
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 20, 2026

@vprashrex vprashrex requested a review from AkhileshNegi April 20, 2026 14:34
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
backend/app/services/collections/delete_collection.py (2)

29-65: ⚠️ Potential issue | 🟡 Minor

Fix the start_job return type.

Line 35 says -> str, but Line 65 returns collection_job_id, which is a UUID. Align the annotation with the implementation unless callers require a string. As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

🛠️ Proposed fix
 ) -> str:
+ ) -> UUID:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/collections/delete_collection.py` around lines 29 - 65,
The function start_job currently annotates its return as str but returns
collection_job_id (a UUID); fix by making the annotation match the
implementation or vice versa: either change the return type annotation on
start_job to -> UUID (and ensure UUID is imported) if callers expect a UUID, or
return str(collection_job_id) and update any callers/types accordingly;
reference the start_job function, collection_job_id variable, and the return
statement to locate the change.

143-149: ⚠️ Potential issue | 🟠 Major

Isolate callback failures from deletion state handling.

Line 234 can raise after the collection has already been deleted and marked successful, causing the broad except to mark the job failed. Line 149 can also raise inside _mark_job_failed_and_callback, preventing the original exception from being re-raised at Line 246.

🛡️ Proposed fix
     if callback_url and collection_job:
         failure_payload = build_failure_payload(
             collection_job=collection_job,
             collection_id=collection_id,
             error_message=str(err),
         )
-        send_callback(callback_url, failure_payload)
+        try:
+            send_callback(callback_url, failure_payload)
+        except Exception:
+            logger.warning(
+                "[delete_collection.execute_job] Failure callback failed | job_id=%s",
+                str(job_id),
+                exc_info=True,
+            )
@@
             if deletion_request.callback_url and collection_job:
                 success_payload = build_success_payload(
                     collection_job=collection_job,
                     collection_id=collection_id,
                 )
-                send_callback(deletion_request.callback_url, success_payload)
+                try:
+                    send_callback(deletion_request.callback_url, success_payload)
+                except Exception:
+                    logger.warning(
+                        "[delete_collection.execute_job] Success callback failed | job_id=%s",
+                        str(job_uuid),
+                        exc_info=True,
+                    )

Also applies to: 229-246

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/collections/delete_collection.py` around lines 143 -
149, The callback sending and failure-marking logic (calls to send_callback and
build_failure_payload and the helper _mark_job_failed_and_callback) must be
isolated so errors in sending callbacks do not change the deletion job state or
swallow the original exception: wrap send_callback and the code path that
builds/sends failure callbacks in their own try/except blocks, log any
callback/send errors but do not call _mark_job_failed_and_callback from inside
the same broad exception path that handles the deletion result, and ensure that
any exception raised during deletion is re-raised after attempts to notify;
likewise protect _mark_job_failed_and_callback itself with its own try/except so
its failures are logged and do not prevent the original exception from being
re-raised. Ensure you reference and modify the code around send_callback,
build_failure_payload, and _mark_job_failed_and_callback to implement these
isolated try/excepts and re-raise the original exception at the end of the outer
exception handler.
backend/app/services/llm/jobs.py (2)

494-583: ⚠️ Potential issue | 🟠 Major

Finalize LLM metrics on all provider failure paths.

Line 501 returns provider setup errors before any llm.call.errors metric is emitted. After Line 512, unexpected exceptions jump to the outer except at Line 681 and also skip record_llm_call_finished(...). This undercounts provider/config failures in the new telemetry.

🛡️ Proposed direction
+            provider_name = str(completion_config.provider)
+            model_name = str(completion_config.params.get("model") or "")
+            operation = "chat"
+            record_llm_call_started(
+                provider=provider_name,
+                model=model_name,
+                operation=operation,
+                organization_id=organization_id,
+                project_id=project_id,
+            )
+            provider_started_at = time.perf_counter()
+            call_finished_recorded = False
             try:
                 provider_instance = get_llm_provider(
@@
             except ValueError as ve:
+                record_llm_call_finished(
+                    provider=provider_name,
+                    model=model_name,
+                    operation=operation,
+                    duration_ms=(time.perf_counter() - provider_started_at) * 1000,
+                    error=True,
+                    organization_id=organization_id,
+                    project_id=project_id,
+                )
+                call_finished_recorded = True
                 return BlockResult(error=str(ve), llm_call_id=llm_call_id)

Then guard the outer except with the same initialized locals so unhandled provider exceptions emit one error metric before returning.

Also applies to: 668-689

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 494 - 583, The provider setup
and execution error paths (get_llm_provider ValueError, inner try ValueError,
and any exceptions that bubble to the outer except) return before calling
record_llm_call_finished, undercounting errors; update the failure exits to
always call record_llm_call_finished(provider=provider_name, model=model_name,
operation="chat", duration_ms=(time.perf_counter()-provider_started_at)*1000,
error=True, organization_id=organization_id, project_id=project_id) before
returning and ensure provider_started_at, provider_name and model_name are
initialized for the outer except path (e.g. set defaults before provider lookup)
so the outer exception handler can emit the same metric and then return the
BlockResult (also keep existing ai_span.set_status calls where applicable).

182-198: ⚠️ Potential issue | 🟠 Major

Make callback delivery best-effort and avoid recording raw callback URLs.

Line 184 and Line 756 put the full callback URL into spans; these URLs often carry tenant-specific paths or secret query tokens. Also, handle_job_error sends the callback before marking the job failed, and the success path sends the callback before marking the job successful. A callback outage can therefore prevent the correct DB status update or flip a successful LLM execution into a failed job.

🛡️ Proposed direction
-            cb_span.set_attribute("callback.url", callback_url)
+            cb_span.set_attribute("callback.configured", True)
             cb_span.set_attribute("callback.status", "failure")
-            send_callback(
-                callback_url=callback_url,
-                data=callback_response.model_dump(),
-            )
+            try:
+                send_callback(
+                    callback_url=callback_url,
+                    data=callback_response.model_dump(),
+                )
+            except Exception:
+                logger.warning(
+                    f"[handle_job_error] Failed to send failure callback | job_id={job_id}",
+                    exc_info=True,
+                )

Apply the same pattern in the success branch after persisting JobStatus.SUCCESS.

Also applies to: 754-779

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 182 - 198, The span currently
records the full callback URL and the code sends callbacks before updating DB
state; change both success and failure flows to update the job status first via
JobCrud.update(JobUpdate(...)) and then attempt a best-effort send_callback
inside a try/except so callback failures do not prevent DB updates, and remove
the raw URL from the span (use an obfuscated value or just record the
host/domain or a hash under tracer.start_as_current_span("llm.send_callback")
instead of the full callback_url) while recording any send error in the span or
logs; apply this pattern to the failure branch shown (send_callback,
cb_span.set_attribute) and the analogous success branch so send_callback is
post-update and wrapped in exception handling.
backend/app/services/collections/create_collection.py (1)

39-76: ⚠️ Potential issue | 🟡 Minor

Fix the start_job return type.

Line 46 says -> str, but Line 76 returns collection_job_id, which is a UUID. Align the annotation with the implementation unless callers require a string. As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

🛠️ Proposed fix
 ) -> str:
+ ) -> UUID:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/collections/create_collection.py` around lines 39 - 76,
The function start_job has a return type annotation of -> str but actually
returns collection_job_id (a UUID); update the signature to return UUID (use the
same UUID type used for the collection_job_id parameter) so the annotation
matches the implementation, i.e., change -> str to -> UUID in start_job and
ensure UUID is imported where needed; alternatively, if callers expect a string,
convert the returned value to str(collection_job_id) before returning—make this
consistent with usages of start_job in the codebase (refer to start_job,
collection_job_id, and start_create_collection_job).
♻️ Duplicate comments (1)
backend/app/core/telemetry.py (1)

356-368: ⚠️ Potential issue | 🟡 Minor

Avoid setting undefined OpenTelemetry attribute values.

Line 363 only checks key presence, so temperature=None still reaches span.set_attribute(...); Line 367 also records empty tools values. Keep these attributes unset unless the value is meaningful. This is a duplicate of the earlier review finding on this same block.

🛡️ Proposed fix
     for attr_key, param_key in (
         ("gen_ai.request.temperature", "temperature"),
         ("gen_ai.request.max_tokens", "max_tokens"),
         ("gen_ai.request.top_p", "top_p"),
         ("gen_ai.request.presence_penalty", "presence_penalty"),
         ("gen_ai.request.frequency_penalty", "frequency_penalty"),
     ):
-        if param_key in params:
-            span.set_attribute(attr_key, params.get(param_key))
+        value = params.get(param_key)
+        if value is not None:
+            span.set_attribute(attr_key, value)
 
     tools = params.get("tools")
-    if tools is not None:
+    if tools:
         span.set_attribute("gen_ai.request.available_tools", json.dumps(tools))
OpenTelemetry Python Span.set_attribute behavior when attribute value is None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/core/telemetry.py` around lines 356 - 368, The telemetry code
currently sets OpenTelemetry attributes even when values are None or empty;
update the loop over ("gen_ai.request.temperature", ...) in the function/method
using span so that you read value = params.get(param_key) and only call
span.set_attribute(attr_key, value) when value is not None (and for
numeric/boolean semantics consider explicitly checking value is not None), and
change the tools handling (variable tools = params.get("tools")) to only set
"gen_ai.request.available_tools" when tools is truthy/non-empty (e.g., if tools:
span.set_attribute(..., json.dumps(tools))) to avoid recording undefined or
empty attributes.
🧹 Nitpick comments (3)
backend/app/core/telemetry.py (2)

514-559: Add parameter type hints to the SQLAlchemy event callbacks.

These nested listener functions return None, but their parameters are untyped. Any is fine here because SQLAlchemy callback argument types are dynamic. As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

♻️ Proposed fix
     `@event.listens_for`(engine, "before_cursor_execute")
     def _before_cursor_execute(
-        conn, cursor, statement, parameters, context, executemany
+        conn: Any,
+        cursor: Any,
+        statement: Any,
+        parameters: Any,
+        context: Any,
+        executemany: bool,
     ) -> None:
@@
     `@event.listens_for`(engine, "after_cursor_execute")
     def _after_cursor_execute(
-        conn, cursor, statement, parameters, context, executemany
+        conn: Any,
+        cursor: Any,
+        statement: Any,
+        parameters: Any,
+        context: Any,
+        executemany: bool,
     ) -> None:
@@
     `@event.listens_for`(engine, "handle_error")
-    def _handle_error(exception_context) -> None:
+    def _handle_error(exception_context: Any) -> None:
@@
     `@event.listens_for`(engine.pool, "checkout")
-    def _on_checkout(dbapi_connection, connection_record, connection_proxy) -> None:
+    def _on_checkout(
+        dbapi_connection: Any,
+        connection_record: Any,
+        connection_proxy: Any,
+    ) -> None:
@@
     `@event.listens_for`(engine.pool, "checkin")
-    def _on_checkin(dbapi_connection, connection_record) -> None:
+    def _on_checkin(dbapi_connection: Any, connection_record: Any) -> None:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/core/telemetry.py` around lines 514 - 559, Update the SQLAlchemy
event listener functions to add parameter type hints (use typing.Any for dynamic
SQLAlchemy args) and explicit return type None: _before_cursor_execute(conn:
Any, cursor: Any, statement: Any, parameters: Any, context: Any, executemany:
Any) -> None, _after_cursor_execute(conn: Any, cursor: Any, statement: Any,
parameters: Any, context: Any, executemany: Any) -> None,
_handle_error(exception_context: Any) -> None, _on_checkout(dbapi_connection:
Any, connection_record: Any, connection_proxy: Any) -> None, and
_on_checkin(dbapi_connection: Any, connection_record: Any) -> None; also add
"from typing import Any" at the top of the module if not present.

563-563: Use direct assignment for the instrumentation marker.

Ruff is right here: constant-name setattr is less clear than direct assignment. Keep the attr-defined suppression if type checking complains about engine: object.

♻️ Proposed fix
-    setattr(engine, "_kaapi_db_telemetry_instrumented", True)
+    engine._kaapi_db_telemetry_instrumented = True  # type: ignore[attr-defined]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/core/telemetry.py` at line 563, Replace the setattr call that
sets the instrumentation marker with a direct attribute assignment on the engine
object: change setattr(engine, "_kaapi_db_telemetry_instrumented", True) to
engine._kaapi_db_telemetry_instrumented = True; if type checkers complain about
engine being typed as object, retain the existing attr-defined suppression
comment so static checks remain satisfied; ensure you update the code around the
instrumentation logic that references engine and the
"_kaapi_db_telemetry_instrumented" attribute.
backend/app/services/llm/jobs.py (1)

55-63: Type the provider callable explicitly.

Line 57 leaves func untyped, and Line 60 uses a bare dict. This helper is on the LLM execution path, so the callable contract should be explicit. As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

♻️ Proposed fix
+from collections.abc import Callable
@@
 def _execute_provider_call(
     *,
-    func,
+    func: Callable[..., tuple[Any, Any]],
     completion_config: Any,
     query: QueryParams,
-    credentials: dict | None,
+    credentials: dict[str, Any] | None,
     session_id: str | None,
     **kwargs: Any,
 ) -> tuple[Any, Any]:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 55 - 63, The helper
_execute_provider_call has an untyped callable and a bare dict; update its
signature to type func as a Callable that returns the same tuple[Any, Any] (e.g.
Callable[..., tuple[Any, Any]] or a more specific Callable[[Any, QueryParams,
dict[str, Any] | None, str | None], tuple[Any, Any]]), and change credentials
from bare dict | None to dict[str, Any] | None (or Mapping[str, Any] | None) and
add the necessary imports (Callable, Mapping) from typing. Ensure the updated
type hints are used for func, credentials, and any other parameters so the
function signature and return annotation are fully typed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/services/collections/create_collection.py`:
- Around line 276-307: The exception handler and the success/failure callback
invocations must not let callback failures change the collection job outcome:
wrap the call to send_callback(creation_request.callback_url, success_payload)
and the later send_callback(..., failure_payload) each in their own try/except
that catches and logs the callback error (include context like collection_job_id
and callback URL) but does NOT re-raise; when handling the original exception in
the except block, ensure you preserve and re-raise the original exception (err)
and do not let any exception from the failure callback mask it—i.e., call
send_callback inside a try/except that logs warnings and continues, then call
_mark_job_failed(...) and finally re-raise the original err; also apply the same
non-fatal try/except around provider.delete(result) cleanup so cleanup/callback
failures don't overwrite job failure handling.

---

Outside diff comments:
In `@backend/app/services/collections/create_collection.py`:
- Around line 39-76: The function start_job has a return type annotation of ->
str but actually returns collection_job_id (a UUID); update the signature to
return UUID (use the same UUID type used for the collection_job_id parameter) so
the annotation matches the implementation, i.e., change -> str to -> UUID in
start_job and ensure UUID is imported where needed; alternatively, if callers
expect a string, convert the returned value to str(collection_job_id) before
returning—make this consistent with usages of start_job in the codebase (refer
to start_job, collection_job_id, and start_create_collection_job).

In `@backend/app/services/collections/delete_collection.py`:
- Around line 29-65: The function start_job currently annotates its return as
str but returns collection_job_id (a UUID); fix by making the annotation match
the implementation or vice versa: either change the return type annotation on
start_job to -> UUID (and ensure UUID is imported) if callers expect a UUID, or
return str(collection_job_id) and update any callers/types accordingly;
reference the start_job function, collection_job_id variable, and the return
statement to locate the change.
- Around line 143-149: The callback sending and failure-marking logic (calls to
send_callback and build_failure_payload and the helper
_mark_job_failed_and_callback) must be isolated so errors in sending callbacks
do not change the deletion job state or swallow the original exception: wrap
send_callback and the code path that builds/sends failure callbacks in their own
try/except blocks, log any callback/send errors but do not call
_mark_job_failed_and_callback from inside the same broad exception path that
handles the deletion result, and ensure that any exception raised during
deletion is re-raised after attempts to notify; likewise protect
_mark_job_failed_and_callback itself with its own try/except so its failures are
logged and do not prevent the original exception from being re-raised. Ensure
you reference and modify the code around send_callback, build_failure_payload,
and _mark_job_failed_and_callback to implement these isolated try/excepts and
re-raise the original exception at the end of the outer exception handler.

In `@backend/app/services/llm/jobs.py`:
- Around line 494-583: The provider setup and execution error paths
(get_llm_provider ValueError, inner try ValueError, and any exceptions that
bubble to the outer except) return before calling record_llm_call_finished,
undercounting errors; update the failure exits to always call
record_llm_call_finished(provider=provider_name, model=model_name,
operation="chat", duration_ms=(time.perf_counter()-provider_started_at)*1000,
error=True, organization_id=organization_id, project_id=project_id) before
returning and ensure provider_started_at, provider_name and model_name are
initialized for the outer except path (e.g. set defaults before provider lookup)
so the outer exception handler can emit the same metric and then return the
BlockResult (also keep existing ai_span.set_status calls where applicable).
- Around line 182-198: The span currently records the full callback URL and the
code sends callbacks before updating DB state; change both success and failure
flows to update the job status first via JobCrud.update(JobUpdate(...)) and then
attempt a best-effort send_callback inside a try/except so callback failures do
not prevent DB updates, and remove the raw URL from the span (use an obfuscated
value or just record the host/domain or a hash under
tracer.start_as_current_span("llm.send_callback") instead of the full
callback_url) while recording any send error in the span or logs; apply this
pattern to the failure branch shown (send_callback, cb_span.set_attribute) and
the analogous success branch so send_callback is post-update and wrapped in
exception handling.

---

Duplicate comments:
In `@backend/app/core/telemetry.py`:
- Around line 356-368: The telemetry code currently sets OpenTelemetry
attributes even when values are None or empty; update the loop over
("gen_ai.request.temperature", ...) in the function/method using span so that
you read value = params.get(param_key) and only call
span.set_attribute(attr_key, value) when value is not None (and for
numeric/boolean semantics consider explicitly checking value is not None), and
change the tools handling (variable tools = params.get("tools")) to only set
"gen_ai.request.available_tools" when tools is truthy/non-empty (e.g., if tools:
span.set_attribute(..., json.dumps(tools))) to avoid recording undefined or
empty attributes.

---

Nitpick comments:
In `@backend/app/core/telemetry.py`:
- Around line 514-559: Update the SQLAlchemy event listener functions to add
parameter type hints (use typing.Any for dynamic SQLAlchemy args) and explicit
return type None: _before_cursor_execute(conn: Any, cursor: Any, statement: Any,
parameters: Any, context: Any, executemany: Any) -> None,
_after_cursor_execute(conn: Any, cursor: Any, statement: Any, parameters: Any,
context: Any, executemany: Any) -> None, _handle_error(exception_context: Any)
-> None, _on_checkout(dbapi_connection: Any, connection_record: Any,
connection_proxy: Any) -> None, and _on_checkin(dbapi_connection: Any,
connection_record: Any) -> None; also add "from typing import Any" at the top of
the module if not present.
- Line 563: Replace the setattr call that sets the instrumentation marker with a
direct attribute assignment on the engine object: change setattr(engine,
"_kaapi_db_telemetry_instrumented", True) to
engine._kaapi_db_telemetry_instrumented = True; if type checkers complain about
engine being typed as object, retain the existing attr-defined suppression
comment so static checks remain satisfied; ensure you update the code around the
instrumentation logic that references engine and the
"_kaapi_db_telemetry_instrumented" attribute.

In `@backend/app/services/llm/jobs.py`:
- Around line 55-63: The helper _execute_provider_call has an untyped callable
and a bare dict; update its signature to type func as a Callable that returns
the same tuple[Any, Any] (e.g. Callable[..., tuple[Any, Any]] or a more specific
Callable[[Any, QueryParams, dict[str, Any] | None, str | None], tuple[Any,
Any]]), and change credentials from bare dict | None to dict[str, Any] | None
(or Mapping[str, Any] | None) and add the necessary imports (Callable, Mapping)
from typing. Ensure the updated type hints are used for func, credentials, and
any other parameters so the function signature and return annotation are fully
typed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a42435d5-b514-4923-8154-ae1f2e20560c

📥 Commits

Reviewing files that changed from the base of the PR and between de1b099 and dea7c4d.

⛔ Files ignored due to path filters (1)
  • backend/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (7)
  • backend/app/api/deps.py
  • backend/app/api/routes/llm.py
  • backend/app/core/telemetry.py
  • backend/app/services/collections/create_collection.py
  • backend/app/services/collections/delete_collection.py
  • backend/app/services/llm/jobs.py
  • backend/pyproject.toml
✅ Files skipped from review due to trivial changes (1)
  • backend/pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (2)
  • backend/app/api/deps.py
  • backend/app/api/routes/llm.py

Comment on lines +276 to +307
if creation_request.callback_url:
send_callback(creation_request.callback_url, success_payload)

except Exception as err:
span.record_exception(err)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(err)))
logger.error(
"[create_collection.execute_job] Collection Creation Failed | {'collection_job_id': '%s', 'error': '%s'}",
job_id,
str(err),
exc_info=True,
)
collection_crud.create(collection)
collection = collection_crud.read_one(collection.id)

if flat_docs:
DocumentCollectionCrud(session).create(collection, flat_docs)
if provider is not None and result is not None:
try:
provider.delete(result)
except Exception:
logger.warning(
"[create_collection.execute_job] Provider cleanup failed"
)

collection_job_crud = CollectionJobCrud(session, project_id)
collection_job = collection_job_crud.update(
collection_job.id,
CollectionJobUpdate(
status=CollectionJobStatus.SUCCESSFUL,
collection_id=collection.id,
),
collection_job = _mark_job_failed(
project_id=project_id,
job_id=job_id,
err=err,
collection_job=collection_job,
)

success_payload = build_success_payload(collection_job, collection)

elapsed = time.time() - start_time
logger.info(
"[create_collection.execute_job] Collection created: %s | Time: %.2fs | Files: %d | Total Size: %s MB | Types: %s",
collection_id,
elapsed,
len(flat_docs),
collection_job.total_size_mb,
list(file_exts),
)

if creation_request.callback_url:
send_callback(creation_request.callback_url, success_payload)

except Exception as err:
logger.error(
"[create_collection.execute_job] Collection Creation Failed | {'collection_job_id': '%s', 'error': '%s'}",
job_id,
str(err),
exc_info=True,
)

if provider is not None and result is not None:
try:
provider.delete(result)
except Exception:
logger.warning(
"[create_collection.execute_job] Provider cleanup failed"
)

collection_job = _mark_job_failed(
project_id=project_id,
job_id=job_id,
err=err,
collection_job=collection_job,
)

if creation_request and creation_request.callback_url and collection_job:
failure_payload = build_failure_payload(collection_job, str(err))
send_callback(creation_request.callback_url, failure_payload)
if creation_request and creation_request.callback_url and collection_job:
failure_payload = build_failure_payload(collection_job, str(err))
send_callback(creation_request.callback_url, failure_payload)
raise
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not let callback failures change the collection job outcome.

Line 277 is inside the main creation try: if the success callback raises after the collection is created and persisted, the except marks the job failed and may delete the provider resource. Line 306 has the inverse problem: failure-callback errors can mask the original exception before the bare raise.

🛡️ Proposed fix
             if creation_request.callback_url:
-                send_callback(creation_request.callback_url, success_payload)
+                try:
+                    send_callback(creation_request.callback_url, success_payload)
+                except Exception:
+                    logger.warning(
+                        "[create_collection.execute_job] Success callback failed | collection_job_id=%s",
+                        job_id,
+                        exc_info=True,
+                    )
@@
             if creation_request and creation_request.callback_url and collection_job:
                 failure_payload = build_failure_payload(collection_job, str(err))
-                send_callback(creation_request.callback_url, failure_payload)
+                try:
+                    send_callback(creation_request.callback_url, failure_payload)
+                except Exception:
+                    logger.warning(
+                        "[create_collection.execute_job] Failure callback failed | collection_job_id=%s",
+                        job_id,
+                        exc_info=True,
+                    )
             raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/collections/create_collection.py` around lines 276 -
307, The exception handler and the success/failure callback invocations must not
let callback failures change the collection job outcome: wrap the call to
send_callback(creation_request.callback_url, success_payload) and the later
send_callback(..., failure_payload) each in their own try/except that catches
and logs the callback error (include context like collection_job_id and callback
URL) but does NOT re-raise; when handling the original exception in the except
block, ensure you preserve and re-raise the original exception (err) and do not
let any exception from the failure callback mask it—i.e., call send_callback
inside a try/except that logs warnings and continues, then call
_mark_job_failed(...) and finally re-raise the original err; also apply the same
non-fatal try/except around provider.delete(result) cleanup so cleanup/callback
failures don't overwrite job failure handling.

@vprashrex vprashrex self-assigned this Apr 20, 2026
@vprashrex vprashrex added the enhancement New feature or request label Apr 20, 2026
@vprashrex vprashrex moved this to In Review in Kaapi-dev Apr 20, 2026
@vprashrex vprashrex linked an issue Apr 20, 2026 that may be closed by this pull request
Comment thread backend/app/api/routes/collections.py Outdated
Comment thread .gitignore Outdated
Comment thread backend/app/main.py
Comment on lines +4 to +9
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.httpx import HttpxIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.starlette import StarletteIntegration
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be grouped

Copy link
Copy Markdown
Collaborator Author

@vprashrex vprashrex Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can't be grouped ... since module imported is depends on their own specific sub-modules ..

Comment on lines +818 to +833
with log_context(
tag="llm-chain",
lifecycle="llm.chain.execute_job",
job_id=job_uuid,
task_id=task_id,
project_id=project_id,
organization_id=organization_id,
total_blocks=len(request.blocks),
):
logger.info(
f"[execute_chain_job] Starting chain execution | "
f"job_id={job_uuid}, total_blocks={len(request.blocks)}"
)

try:
with Session(engine) as session:
chain_record = create_llm_chain(
session,
try:
with Session(engine) as session:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too many nesting like with -> try -> with

Comment thread backend/app/celery/celery_app.py Outdated
Comment thread backend/app/services/llm/jobs.py Outdated
usage = response.usage
if usage:
ai_span.set_attribute("llm.usage.total_tokens", usage.total_tokens)
ai_span.set_attribute("kaapi_llm_input_tokens", usage.input_tokens)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to mention name like kaapi_llm_input_tokens or just llm_input_tokens works

Comment thread backend/app/core/telemetry.py Outdated
)


def record_celery_task_started(task: object | None) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we using this ? cant seem to find reference to this code

Comment thread backend/app/core/telemetry.py Outdated
_emit_celery_worker_gauges(active, pid)


def record_celery_task_finished(task: object | None, state: str | None) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we using this ? cant seem to find reference to this code

from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration

sentry_sdk.init(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is mostly repeat of sentry_sdk.init in backend/app/main.py, can we use something smarter


async def http_request_logger(request: Request, call_next) -> Response:
start_time = time.time()
method = request.method
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got this from using claude to review, can you check if it makes sense

backend/app/core/middleware.py:13-14

method = request.method
route = request.url.path

request.url.path is the concrete URL — e.g. /api/v1/collections/123e4567-e89b-12d3-a456-426614174000 — and it's then pushed into OTel
span attributes and Sentry tags at middleware.py:16-25, 31-33, 42-53, 68-77.

Why it's a problem

  • Cardinality blow-up. Every UUID in the path becomes a unique http.route tag value. Sentry bills and indexes tags per unique value;
    your /collections/{id}, /llm/call/{job_id}, /projects/{pid}/... endpoints will fan out to thousands of tag values. Dashboards
    filtering by route become unusable because each request is its own row.
  • PII leakage. Paths can contain user-identifying values — email-like slugs in some routes, callback URLs with query strings, API keys
    in path params on legacy endpoints. sentry_sdk.set_tag("http.route", "/users/akhilesh@…") writes that identifier into the tag index
    and ships it to Sentry regardless of scrubbing rules, which only apply to PII in known fields.
  • It breaks AI Insights / Performance grouping. Sentry's Performance module groups transactions by http.route. With the raw path,
    every UUID gets its own transaction group — the "slowest endpoints" view is meaningless.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
backend/app/celery/celery_app.py (1)

80-123: Hoist the task_postrun flush handler and tighten the signature.

Two small things on initialize_worker_process:

  1. **_ is missing a type annotation, unlike the other signal handlers in this file (**_: object). As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".
  2. Nesting @task_postrun.connect(weak=False) inside another signal handler works (it fires once per forked process), but it makes the task-flush wiring invisible from a module-level read and couples telemetry flushing to the Sentry/OTel init block. Defining it at module level (next to log_pool_status_post) keeps all task_postrun subscribers co-located and removes the closure-lifetime concern that motivated weak=False in the first place.
♻️ Proposed refactor
-@worker_process_init.connect
-def initialize_worker_process(**_) -> None:
+@worker_process_init.connect
+def initialize_worker_process(**_: object) -> None:
     """Initialize each forked Celery worker process.
@@
-    from app.core.telemetry import flush_telemetry, setup_telemetry
-
-    setup_telemetry(service_name="kaapi-celery")
-
-    `@task_postrun.connect`(weak=False)
-    def _flush_otel_after_task(**_: object) -> None:
-        flush_telemetry()
-
-    import app.services.llm.jobs  # noqa: F401
+    from app.core.telemetry import setup_telemetry
+
+    setup_telemetry(service_name="kaapi-celery")
+
+    import app.services.llm.jobs  # noqa: F401
+
+
+@task_postrun.connect(weak=False)
+def _flush_otel_after_task(**_: object) -> None:
+    from app.core.telemetry import flush_telemetry
+
+    flush_telemetry()

As per coding guidelines: "Always add type hints to all function parameters and return values in Python code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/celery/celery_app.py` around lines 80 - 123, The task_postrun
flush handler currently lives nested inside initialize_worker_process and uses
an untyped **_ parameter; move the `@task_postrun.connect`(weak=False) subscriber
out to module scope (co-locate it with log_pool_status_post) so task-flush
wiring is visible and not coupled to Sentry/OTel init, and change the handler
signature to def _flush_otel_after_task(**_: object) -> None to add the missing
type hint; the handler should simply call flush_telemetry() and retain
weak=False on the decorator.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/api/routes/collections.py`:
- Around line 107-132: The job record uses deduplicated IDs (unique_documents)
but create_service.start_job is still passed the original request, so the worker
may process duplicates; update the call to create_service.start_job to pass a
request object whose documents field is the deduplicated list (use
unique_documents or set request.documents = unique_documents before the call, or
create a shallow copy of request with documents=[str(doc_id) for doc_id in
unique_documents]) so the worker receives the same deduplicated document list
used to create the CollectionJob (references: unique_documents,
CollectionJobCreate, create_service.start_job).

In `@backend/app/tests/services/collections/test_delete_collection.py`:
- Around line 373-376: The test functions (e.g.,
test_execute_job_local_delete_failure_after_remote_success_marks_failed) are
missing a type annotation for the db fixture; add a parameter type hint db:
Session and ensure you import Session (e.g., from sqlalchemy.orm import Session)
at the top of the test module, and apply the same change to the other new
test(s) referenced (around lines 441-444) so all fixture parameters are properly
typed.

---

Nitpick comments:
In `@backend/app/celery/celery_app.py`:
- Around line 80-123: The task_postrun flush handler currently lives nested
inside initialize_worker_process and uses an untyped **_ parameter; move the
`@task_postrun.connect`(weak=False) subscriber out to module scope (co-locate it
with log_pool_status_post) so task-flush wiring is visible and not coupled to
Sentry/OTel init, and change the handler signature to def
_flush_otel_after_task(**_: object) -> None to add the missing type hint; the
handler should simply call flush_telemetry() and retain weak=False on the
decorator.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 60326658-4721-4c8c-bd25-5fa0190dceb8

📥 Commits

Reviewing files that changed from the base of the PR and between dea7c4d and 0f1b045.

📒 Files selected for processing (6)
  • backend/app/api/routes/collections.py
  • backend/app/celery/celery_app.py
  • backend/app/core/middleware.py
  • backend/app/services/llm/jobs.py
  • backend/app/tests/services/collections/test_create_collection.py
  • backend/app/tests/services/collections/test_delete_collection.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • backend/app/services/llm/jobs.py
  • backend/app/core/middleware.py

Comment on lines +107 to 132
unique_documents = list(dict.fromkeys(request.documents))

collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
)
)

unique_documents = list(dict.fromkeys(request.documents))
# True if both model and instructions were provided in the request body
with_assistant = bool(
getattr(request, "model", None) and getattr(request, "instructions", None)
)

collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
create_service.start_job(
db=session,
request=request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
organization_id=current_user.organization_.id,
with_assistant=with_assistant,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Pass the deduplicated request to the worker.

Line 107 deduplicates documents for CollectionJobCreate, but Line 127 still dispatches the original request; duplicate document IDs can make the job record report one set while the Celery worker processes another.

🐛 Proposed fix
         unique_documents = list(dict.fromkeys(request.documents))
+        normalized_request = request.model_copy(
+            update={"documents": unique_documents}
+        )
 
         collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
         collection_job = collection_job_crud.create(
@@
         create_service.start_job(
             db=session,
-            request=request,
+            request=normalized_request,
             collection_job_id=collection_job.id,
             project_id=current_user.project_.id,
             organization_id=current_user.organization_.id,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
unique_documents = list(dict.fromkeys(request.documents))
collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
)
)
unique_documents = list(dict.fromkeys(request.documents))
# True if both model and instructions were provided in the request body
with_assistant = bool(
getattr(request, "model", None) and getattr(request, "instructions", None)
)
collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
create_service.start_job(
db=session,
request=request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
organization_id=current_user.organization_.id,
with_assistant=with_assistant,
)
unique_documents = list(dict.fromkeys(request.documents))
normalized_request = request.model_copy(
update={"documents": unique_documents}
)
collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
)
)
# True if both model and instructions were provided in the request body
with_assistant = bool(
getattr(request, "model", None) and getattr(request, "instructions", None)
)
create_service.start_job(
db=session,
request=normalized_request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
with_assistant=with_assistant,
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/routes/collections.py` around lines 107 - 132, The job record
uses deduplicated IDs (unique_documents) but create_service.start_job is still
passed the original request, so the worker may process duplicates; update the
call to create_service.start_job to pass a request object whose documents field
is the deduplicated list (use unique_documents or set request.documents =
unique_documents before the call, or create a shallow copy of request with
documents=[str(doc_id) for doc_id in unique_documents]) so the worker receives
the same deduplicated document list used to create the CollectionJob
(references: unique_documents, CollectionJobCreate, create_service.start_job).

Comment on lines +373 to +376
@patch("app.services.collections.delete_collection.get_llm_provider")
def test_execute_job_local_delete_failure_after_remote_success_marks_failed(
mock_get_llm_provider: MagicMock, db
) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add type hints for the new db fixture parameters.

Both newly added test functions omit the Session type on db.

Proposed fix
 def test_execute_job_local_delete_failure_after_remote_success_marks_failed(
-    mock_get_llm_provider: MagicMock, db
+    mock_get_llm_provider: MagicMock, db: Session
 ) -> None:
@@
 def test_execute_job_provider_factory_failure_marks_job_failed(
-    mock_get_llm_provider: MagicMock, db
+    mock_get_llm_provider: MagicMock, db: Session
 ) -> None:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

Also applies to: 441-444

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/tests/services/collections/test_delete_collection.py` around
lines 373 - 376, The test functions (e.g.,
test_execute_job_local_delete_failure_after_remote_success_marks_failed) are
missing a type annotation for the db fixture; add a parameter type hint db:
Session and ensure you import Session (e.g., from sqlalchemy.orm import Session)
at the top of the test module, and apply the same change to the other new
test(s) referenced (around lines 441-444) so all fixture parameters are properly
typed.

@vprashrex vprashrex merged commit 0220157 into main Apr 21, 2026
2 of 3 checks passed
@vprashrex vprashrex deleted the feature/observability-fix branch April 21, 2026 05:26
@github-project-automation github-project-automation Bot moved this from In Review to Closed in Kaapi-dev Apr 21, 2026
@AkhileshNegi AkhileshNegi changed the title Enhance telemetry and logging for collection and LLM job services Observability: Telemetry and logging for collection and LLM job services Apr 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

Status: Closed

Development

Successfully merging this pull request may close these issues.

Observability: Testing new monitoring tools

2 participants